package demo.servicemix.activemq;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MQContext {
	public static String jmsUrl_11 = "tcp://192.168.190.11:61616";
	public static String jmsUrl_12 = "tcp://192.168.190.12:61616";
	public static String jmsUrl_all = "failover:(" + jmsUrl_11 + "," + jmsUrl_12
			+ ")?backup=true&timeout=5000&randomize=false&jms.useAsyncSend=true";

	public static enum Type {
		Queue, Topic
	}

	private String userName = "";
	private String password = "";

	private ActiveMQConnectionFactory factory;
	private Connection connection;
	private Session session;
	private Map<String, Destination> destCache = new ConcurrentHashMap<String, Destination>();

	public MQContext(String url, String userName, String password) {
		this.userName = userName;
		this.password = password;
		factory = new ActiveMQConnectionFactory(url);
	}

	public MQContext(String url) {
		this(url, "", "");
	}

	public void init() throws Exception {
		init(null);
	}

	public void init(String clientId) throws Exception {
		if (connection != null) {
			return;
		}
		connection = factory.createConnection(userName, password);
		if (clientId != null) {
			connection.setClientID(clientId);
		}
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	private Destination getDestination(Type type, String name) throws Exception {
		String key = type + "_" + name;
		Destination destination = destCache.get(key);
		if (destination == null) {
			if (type == Type.Queue) {
				destination = session.createQueue(name);
			} else if (type == Type.Topic) {
				destination = session.createTopic(name);
			}
			destCache.put(key, destination);
		}
		return destination;
	}

	public MessageProducer createQueueProducer(String queueName) throws Exception {
		return session.createProducer(getDestination(Type.Queue, queueName));
	}

	public MessageProducer createTopicProducer(String topicName) throws Exception {
		return session.createProducer(getDestination(Type.Topic, topicName));
	}

	public MessageConsumer createQueueConsumer(String queueName, String messageSelector) throws Exception {
		if (messageSelector == null) {
			return session.createConsumer(getDestination(Type.Queue, queueName));
		} else {
			return session.createConsumer(getDestination(Type.Queue, queueName), messageSelector);
		}
	}

	public MessageConsumer createTopicConsumer(String topicName, String messageSelector) throws Exception {
		if (messageSelector == null) {
			return session.createConsumer(getDestination(Type.Topic, topicName));
		} else {
			return session.createConsumer(getDestination(Type.Topic, topicName), messageSelector);
		}
	}

	public MessageConsumer createQueueConsumer(String queueName) throws Exception {
		return createQueueConsumer(queueName, null);
	}

	public MessageConsumer createTopicConsumer(String topicName) throws Exception {
		return createTopicConsumer(topicName, null);
	}

	public MessageConsumer createDurableSubscribe(String topicName, String subscribeName, String messageSelector)
			throws Exception {
		if (messageSelector == null) {
			return session.createDurableSubscriber((Topic) getDestination(Type.Topic, topicName), subscribeName);
		} else {
			return session.createDurableSubscriber((Topic) getDestination(Type.Topic, topicName), subscribeName,
					messageSelector, true);
		}
	}

	public MessageConsumer createDurableSubscribe(String topicName, String subscribeName) throws Exception {
		return createDurableSubscribe(topicName, subscribeName, null);
	}

	public void destroy() {
		if (session != null) {
			try {
				session.close();
			} catch (Exception e) {
			}
			session = null;
		}
		if (connection != null) {
			try {
				connection.stop();
			} catch (Exception e) {
			}
			try {
				connection.close();
			} catch (Exception e) {
			}
			connection = null;
		}
		destCache.clear();
	}

	public Message createMessage() throws Exception {
		return session.createMessage();
	}

	public TextMessage createTextMessage(String text) throws Exception {
		return session.createTextMessage(text);
	}

	public BytesMessage createBytesMessage() throws Exception {
		return session.createBytesMessage();
	}

	public ObjectMessage createObjectMessage(Serializable obj) throws Exception {
		return session.createObjectMessage(obj);
	}

	public MapMessage createMapMessage() throws Exception {
		return session.createMapMessage();
	}
}
